package cn.webank.framework.message.subscriber;

import java.util.List;
import java.util.Locale;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ReloadableResourceBundleMessageSource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import cn.webank.framework.biz.service.support.WeBankServiceDispatcher;
import cn.webank.framework.biz.utils.ServiceUtil;
import cn.webank.framework.context.WeBankContext;
import cn.webank.framework.dto.BizError;
import cn.webank.framework.dto.BizErrors;
import cn.webank.framework.exception.SysException;
import cn.webank.framework.utils.WeBankContextUtil;
import cn.webank.rmb.api.SimpleSubscriber;
import cn.webank.rmb.destination.Destination;
import cn.webank.rmb.destination.SimpleDestination;
import cn.webank.rmb.message.Message;

/**
 * 标准队列消息监听类,是由rmb主动推的模式,不建议使用
 * 
 * @author jonyang
 *
 */
@Deprecated
public class MessageBroadCastSubscriber extends SimpleSubscriber {

	private static final long serialVersionUID = 5218683221783476290L;

	private final static Logger LOG = LoggerFactory.getLogger(MessageBroadCastSubscriber.class);
	
	/**
	 * 任务执行队列
	 */
	private ThreadPoolTaskExecutor taskExecutor;

	/**
	 * 服务分发类
	 */
	private WeBankServiceDispatcher serviceDispatcher;

	/**
	 * 国际化消息管理资源类
	 */
	private ReloadableResourceBundleMessageSource bundleMessageSource;

	/**
	 * 服务方系统ID4位
	 */
	private String sysId;

	/**
	 * 服务所在的dcn号
	 */
	private String dcnNo;

	@Deprecated
	/**
	 * 构造函数
	 * 原来用的rmb的thread pool，目前转回用自己的pool,注意调整rmb-client.properties 
	 * rmb.threadPool.thread.count=1 只需要一个搬运工，从rmb的thread pool搬运到sdk的thread pool
	 * rmb.threadPool.queueSize=2000
     *
	 * @param sysId
	 * @param dcnNo
	 * @param destinations
	 * @param bundleMessageSource
	 * @param serviceDispatcher
	 */
	public MessageBroadCastSubscriber(String sysId, String dcnNo, List<Destination> destinations,
			ReloadableResourceBundleMessageSource bundleMessageSource, WeBankServiceDispatcher serviceDispatcher) {
		this.sysId = sysId;
		this.dcnNo = dcnNo;
		this.serviceDispatcher = serviceDispatcher;
		this.setDestinations(destinations);
		this.bundleMessageSource = bundleMessageSource;
	}
	
	public MessageBroadCastSubscriber(String sysId, String dcnNo, List<Destination> destinations,
			ThreadPoolTaskExecutor taskExecutor, WeBankServiceDispatcher serviceDispatcher,
			ReloadableResourceBundleMessageSource bundleMessageSource) {
		this.sysId = sysId;
		this.dcnNo = dcnNo;
		this.serviceDispatcher = serviceDispatcher;
		this.setDestinations(destinations);
		this.bundleMessageSource = bundleMessageSource;
		this.taskExecutor =  taskExecutor;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * cn.webank.rmb.api.SimpleSubscriber#handle(cn.webank.rmb.message.Message)
	 */
	@Override
	public void handle(Message message) {

		LOG.debug("MessageBroadCastSubscriber begin:" + message);
		try {
			BizErrors errors = new BizErrors();

			// 1.set context bind thread
			WeBankContext context = new WeBankContext();
			context.setBizSeqNo(message.getSysHeader().getBizSeqNo());
			context.setDcnNo(dcnNo);
			context.setOrgSysId(message.getSysHeader().getOrgSysId());
			context.setLastConsumerId(message.getSysHeader().getConsumerId());
			context.setLastConsumerSeqNo(message.getSysHeader().getConsumerSeqNo());
			context.setSysId(this.sysId);
			context.setReversalSeqNo(message.getAppHeader() == null ? null : message.getAppHeader().getReversalSeqNo());
			WeBankContextUtil.setContext(context);

			// 2.dispatch service
			Destination destination = message.getDestination();
			if (destination instanceof SimpleDestination) {
//				String serviceId = ((SimpleDestination) destination).getServiceOrEventId();
//				String scenario = ((SimpleDestination) destination).getScenario();
//				String organizationId = message.getSysHeader().getOrganizationId();
//				String version = message.getSysHeader().getVersion();

				Runnable r = ServiceUtil.constructExcutorTask(message, dcnNo, serviceDispatcher, null,
						bundleMessageSource, this.sysId, errors, Locale.CHINESE);
				taskExecutor.submit(r);

				// serviceDispatcher.dispatch(organizationId,serviceId,
				// scenario,version,
				// message, errors);

				if (errors.hasErrors()) {
					StringBuilder sb = new StringBuilder();

					for (BizError error : errors.getAllErrors()) {
						sb.append(this.bundleMessageSource.getMessage(error.getCode(), error.getArguments(),
								error.getDefaultMessage(), Locale.CHINESE)).append("\n");
					}

					LOG.error("bizSeqNo:" + message.getSysHeader().getBizSeqNo() + " distach service error:"
							+ sb.toString());
				}
			} else {
				throw new SysException("Destination type is not supportted!");
			}

		} catch (Exception e) {
			// 由业务服务记录错误报文
			LOG.error("bizSeqNo:" + message.getSysHeader().getBizSeqNo() + " handle message error:", e);
		} finally {
			WeBankContextUtil.unsetContext();
		}

		LOG.debug("MessageBroadCastSubscriber task end:" + message);
	}

	public ThreadPoolTaskExecutor getTaskExecutor() {
		return taskExecutor;
	}

	public WeBankServiceDispatcher getServiceDispatcher() {
		return serviceDispatcher;
	}

	public ReloadableResourceBundleMessageSource getBundleMessageSource() {
		return bundleMessageSource;
	}

	public String getSysId() {
		return sysId;
	}

	public String getDcnNo() {
		return dcnNo;
	}

	public void setTaskExecutor(ThreadPoolTaskExecutor taskExecutor) {
		this.taskExecutor = taskExecutor;
	}

	public void setServiceDispatcher(WeBankServiceDispatcher serviceDispatcher) {
		this.serviceDispatcher = serviceDispatcher;
	}

	public void setBundleMessageSource(ReloadableResourceBundleMessageSource bundleMessageSource) {
		this.bundleMessageSource = bundleMessageSource;
	}

	public void setSysId(String serviceSysId) {
		this.sysId = serviceSysId;
	}

	public void setDcnNo(String dcnNo) {
		this.dcnNo = dcnNo;
	}

}
